-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Data Files from Parquet Files to UnPartitioned Table #506
Conversation
Updates from offline discussions:
|
I just realized that this approach won't work if we want to add files from HIVE tables, because HIVE style partitioning results in parquet files that do not actually have the partition data in them. The partition columns are inferred from the directory structure. But I think the suggested approach should be favored over file path inference if it is possible. @Fokko , I'd love to get your opinion on the following:
These two modes cover some of the options that were discussed in the initial discussion of the add_files migration procedure. |
So both of the approaches have pro's and con's. One thing I would like to avoid is having to rely on Hive directly, this will make sure that we can generalize it to also import generic Parquet files. One problematic thing is that with Iceberg hidden partitioning we actually have the source-id that points to the field where the data is being kept. If the Hive partitioning is just arbitrary, eg: INSERT INTO transactions PARTITION (year = '2023') AS SELECT name, amount FROM some_other_table In this case there is no relation between the partition and any column in the table. In Iceberg you would expect something like: INSERT INTO transactions PARTITION (year = '2023') AS SELECT name, amount, created_at FROM some_other_table Where the partitioning is I would also expect the user to pre-create the partition spec prior to the import, because inferring is tricky. |
Thank you for the context @Fokko . What I meant by partition inference is the act of inferring the partition values instead of the Partition Spec itself. So this function only runs after the Iceberg Table has been created with its expected PartitionSpec. But because Hive tables have the partition values in the file paths instead of in the actual data files, I'm proposing that we have the two modes of partition value inference: one from the file paths, and the other based on the upper and lower bound values from the parquet metadata |
@syun64 I'm all for it if it works, but I see a lot of issues with inferring it from the Hive path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good @syun64. Could you also update the docs? We could also defer the partitioning into a separate PR, up to you 👍
pyiceberg/table/__init__.py
Outdated
if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields): | ||
raise NotImplementedError("Cannot add_files to a table with Transform Partitions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can be more permissive. It isn't a problem the table's current partitioning has something different than a IdentitiyTransform
, the issue is that we cannot add DataFiles that use this partitioning (until we find a clever way of checking this).
pyiceberg/table/__init__.py
Outdated
if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields): | ||
raise NotImplementedError("Cannot add_files to a table with Transform Partitions") | ||
|
||
if self.name_mapping() is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically you don't have to add a name-mapping if the field-IDs are set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko Yeah I think you are right!
When field IDs are in the files, and the name_mapping is also present, the field_ids take precedence over the name_mapping in schema resolution. So the name_mapping here would essentially be meaningless in that case.
I'm on the fence between moving forward with your suggestion (create name_mapping if there are no field_ids) or whether we should always assert that the parquet files that we want to add have no field IDs. And that's because the field_ids that we actually use in our Iceberg generated parquet files, is the Iceberg Table's internal notion of field IDs. Whenever a new table gets created, new field IDs are assigned, and Iceberg keeps track of these field IDs internally to ensure that the same field can be treated the same through column renaming.
When we add_files, we are introducing files that have been produced by an external process to Iceberg, which isn't aware of Iceberg's internal fields metadata. In that sense, I feel that allowing files that have field_ids to be added could result in unexpected errors for the user that are difficult to diagnose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that the Parquet file and the mapping don't match. For example, there are more fields in the parquet file than in the mapping. I think it is good to add checks there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this check here @Fokko let me know if that makes sense to you
Yeah. I don't personally need migration procedures to add files from Hive tables, but I am aware of various teams and community members that want this sort of feature to migrate to Iceberg from Hive without having to rewrite all of their files. I do think that partition inference from partition path is more complicated and has more gotchas that need to be discussed at length than the more accurate approach based on the partition metadata. I will pull that feature out and put together a follow up PR that only introduces file addition to partitioned tables using the lower and upper bounds of the partition column in the partition metadata. |
Makefile
Outdated
@@ -42,7 +42,7 @@ test-integration: | |||
docker-compose -f dev/docker-compose-integration.yml up -d | |||
sleep 10 | |||
docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py | |||
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} | |||
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was committed by accident?
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} | |
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always do 😅
pyiceberg/table/__init__.py
Outdated
if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields): | ||
raise NotImplementedError("Cannot add_files to a table with Transform Partitions") | ||
|
||
if self.name_mapping() is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that the Parquet file and the mapping don't match. For example, there are more fields in the parquet file than in the mapping. I think it is good to add checks there.
tests/integration/test_add_files.py
Outdated
df = spark.table(identifier) | ||
assert df.count() == 6, "Expected 6 rows" | ||
assert len(df.columns) == 4, "Expected 4 columns" | ||
df.show() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was for testing, can we remove this one? .show()
is a spark action, meaning it will run the pipeline.
@syun64 Can you add this also to the docs? :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @syun64 ! Adding 2 quick comments
@@ -292,6 +292,39 @@ The nested lists indicate the different Arrow buffers, where the first write res | |||
|
|||
<!-- prettier-ignore-end --> | |||
|
|||
### Add Files | |||
|
|||
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we mention in the doc that this procedure currently only work for unpartitioned table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe! We've already discussed the different approaches for supporting adds to partitioned tables extensively, so I'm optimistic we'll get it in before the next release. I'll put it up shortly after this is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great! Thanks!
Co-authored-by: Honah J. <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @syun64 for the great work and @Fokko for reviewing!
PyIceberg's version of add_files Spark migration procedure.
Some early ideas on its implementation:
EDIT: Supporting addition of parquet files as data files to partitioned tables will be introduced in a separate PR. Options have been discussed in the comments on this PR, and we are breaking it up to make code reviews easier